package com.companyname.demo.cod.scheduled;

import com.companyname.demo.cod.dao.ScheduledTaskInfoMapper;
import com.companyname.demo.cod.domain.ScheduledBizTaskInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 使用ScheduledThreadPoolExecutor：动态创建定时任务
 * 业务场景描述：
 * 从数据库捞出的一组任务中，每个任务的执行耗时差异很大，要求串行执行任务（涉及数据库的一致性），
 * 要求执行时间长的任务不阻塞其他任务按其正常rate执行
 * 实现分析：
 * 如果让它们使用同一个ThreadPoolExecutor执行器，任务保存在同一个queue中时，将线程数调高不能满足串行执行要求，
 * 将线程数调至和任务数一致又不能满足不阻塞要求，耗时长的任务最终会霸占池中所有的可用线程，导致其他任务都堵塞在线程池队列中，
 * 或者队列满后被线程池拒绝执行。
 * 实现方案：
 * 1.每个任务建立一个独有的ThreadPoolExecutor执行器，核心线程设置为1来保证串行执行。
 * 2.当任务执行时间大于往queue里塞入的速度时，queue的size会不断增大；当用户在页面修改task的属性时(status、period)，queue中的任务要及时生效，
 * 鉴于以上情形，在任务执行完后需要手动将该task从queue中强制删除
 */
@Service
@Slf4j
public class DynamicCreateScheduledTask {
    @Autowired
    private ScheduledTaskInfoMapper scheduledTaskInfoMapper;

    //拒绝策略：直接抛出拒绝异常
    private final RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
    //使用Map保证每个taskName单独一个Executor执行器，Executor按task分开，执行时间长的task不会影响其他task
    private final Map<String, Object> taskMappingExecutor = new HashMap<>();

    /**
     * 定时任务：
     * 10s定期从数据库中获取status=Y的任务集合(任务的属性status、period由用户在页面动态改变)，
     * 对每个任务进行包装(转换为Runnable类型的对象)，
     * 调用scheduleAtFixedRate()方法并设置period(rate)来添加任务到线程池中
     */
    @Scheduled(fixedRate = 10000)
    public void start() {
        log.info("--------定时任务统一处理开始执行！--------");
        //1.从数据库中获取status=Y的任务集合
        ScheduledBizTaskInfo queryVO = new ScheduledBizTaskInfo();
        queryVO.setStatus("Y");
        List<ScheduledBizTaskInfo> validBizTasksDB = scheduledTaskInfoMapper.select(queryVO);
        //2.遍历每个任务：
        validBizTasksDB.forEach((ScheduledBizTaskInfo eachBizTask) -> {
            //2.1 当前任务如果没有自己的TreadPoolExecutor则新建，如果有则从Map取出
            //这里没有直接使用ScheduledThreadPoolExecutor，因为要自定义实现ThreadPoolExecutor.afterExecute()方法来删除task
            CustomScheduledThreadPoolExecutor executor = null;
            String taskName = eachBizTask.getTaskName();
            if (taskMappingExecutor.get(taskName) == null) {//新建该任务的ThreadPoolExecutor
                ThreadFactory customThreadFactory = new ThreadFactory() {
                    private final AtomicInteger counter = new AtomicInteger(1);
                    @Override
                    public Thread newThread(Runnable r) {
                        //设置自定义名字
                        Thread thread = new Thread(r, "DynamicScheduledThreadPool-" + taskName + "#" + counter.getAndIncrement());
                        return thread;
                    }
                };
                //设置corePoolSize=1目的是为了让线程池里的该任务串行运行(注意如果corePoolSize>1，会并行运行。而corePoolSize=1时默认是串行工作)
                executor = new CustomScheduledThreadPoolExecutor(1, customThreadFactory, rejectedExecutionHandler);
                //Map中保存一份
                taskMappingExecutor.put(taskName, executor);
            } else {
                executor = (CustomScheduledThreadPoolExecutor) taskMappingExecutor.get(taskName);
            }

            //2.2 将业务任务bizTask包装为FutureTask类型的任务（scheduleAtFixedRate方法入参要求）
            CustomScheduledFutureTask wapperTask = new CustomScheduledFutureTask(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    //还没搞懂这个回调方法怎么用...
                    return null;
                }
            });
            wapperTask.setTaskId(eachBizTask.getId());
            wapperTask.setTaskName(eachBizTask.getTaskName());

            //2.3 将任务塞入queue
            ScheduledFuture<?> scheduledFuture = executor.scheduleAtFixedRate(wapperTask, 0L, eachBizTask.getRateSeconds(), TimeUnit.SECONDS);
        });//foreach over

    }


}
